From 15a4ed66e9f3b547ff4ce9baf96170897edddce4 Mon Sep 17 00:00:00 2001 From: vramesha Date: Wed, 11 Feb 2026 11:23:29 +0530 Subject: [PATCH 1/3] Optimize query logging performance (#5243) Detailed Changes: - Introduced LogBuffer and LogBufferThreadContext to implement per-thread buffering, reducing lock contention. - Replaced localtime() with localtime_r() for improved performance and thread safety during timestamp generation. - Implemented configurable sampling for event logs to reduce I/O overhead. - Added global configuration variables to control flush size, flush interval, and sampling rate for event and audit logs. --- include/MySQL_Logger.hpp | 104 +++++++++++----- include/MySQL_Thread.h | 5 + include/PgSQL_Logger.hpp | 24 +++- include/PgSQL_Thread.h | 5 + include/log_utils.h | 208 +++++++++++++++++++++++++++++++ include/proxysql_structs.h | 20 +++ lib/Makefile | 2 +- lib/MySQL_Logger.cpp | 243 ++++++++++++++++++++++++++----------- lib/MySQL_Thread.cpp | 79 ++++++++++++ lib/PgSQL_Logger.cpp | 214 +++++++++++++++++++++++--------- lib/PgSQL_Thread.cpp | 79 ++++++++++++ lib/ProxySQL_Admin.cpp | 4 + lib/log_utils.cpp | 147 ++++++++++++++++++++++ 13 files changed, 973 insertions(+), 161 deletions(-) create mode 100644 include/log_utils.h create mode 100644 lib/log_utils.cpp diff --git a/include/MySQL_Logger.hpp b/include/MySQL_Logger.hpp index 98357613f5..1d151321dc 100644 --- a/include/MySQL_Logger.hpp +++ b/include/MySQL_Logger.hpp @@ -11,6 +11,8 @@ #define PROXYSQL_LOGGER_PTHREAD_MUTEX +class LogBuffer; +class LogBufferThreadContext; class MySQL_Logger; struct p_ml_counter { @@ -118,50 +120,48 @@ class MySQL_Event { ~MySQL_Event(); /** - * @brief Writes the event data to a file stream. - * @param f A pointer to the file stream. - * @param sess A pointer to the MySQL_Session object. - * @return The total number of bytes written. - * - * This function writes the event data to the specified file stream based on the event type and the configured log format. - */ - uint64_t write(std::fstream* f, MySQL_Session* sess); - - /** - * @brief Writes the event data in binary format (format 1) to a file stream. - * @param f A pointer to the file stream to write to. Must not be NULL. - * @return The total number of bytes written to the stream. - * - * This function serializes the event data into a binary format according to the MySQL event log format 1 specification. - * It encodes lengths using MySQL's length encoding scheme. - * The function writes the event type, thread ID, username, schema name, client address, hostgroup ID (if available), server address (if available), timestamps, client statement ID (if applicable), affected rows, last insert ID, rows sent, query digest, and query string to the file stream. - * The function writes all fields as defined by the MySQL event log format. - * It handles variable-length fields using MySQL's length encoding, which means that the length of each field is written before the field data itself. - * The function carefully handles potential errors during file writing operations. - */ - uint64_t write_query_format_1(std::fstream* f); - - - /** - * @brief Writes the event data in JSON format (format 2) to a file stream. - * @param f A pointer to the file stream to write to. Must not be NULL. - * @return The total number of bytes written to the stream (always 0 in the current implementation). + * @brief Writes the event data to a LogBuffer. + * @param f A pointer to LogBuffer to write to. + * @param sess A pointer to the MySQL_Session object. + * @return The total number of bytes written. + * + * This function writes the event data to the specified LogBuffer based on the event type and the configured log format. + */ + uint64_t write(LogBuffer* f, MySQL_Session* sess); + + /** + * @brief Writes the event data in binary format (format 1) to a LogBuffer. + * @param f A pointer to the LogBuffer to write to. + * @return The total number of bytes written. + * + * This function serializes the event data into a binary format according to the MySQL event log format 1 specification. + * It encodes lengths using MySQL's length encoding scheme. + * The function writes the event type, thread ID, username, schema name, client address, hostgroup ID (if available), server address (if available), timestamps, client statement ID (if applicable), affected rows, last insert ID, rows sent, query digest, and query string to the LogBuffer. + * The function writes all fields as defined by the MySQL event log format. + * It handles variable-length fields using MySQL's length encoding, which means that the length of each field is written before the field data itself. + */ + uint64_t write_query_format_1(LogBuffer* f); + + /** + * @brief Writes the event data in JSON format (format 2) to a LogBuffer. + * @param f A pointer to the LogBuffer to write to. Must not be NULL. + * @return The total number of bytes written (always 0 in the current implementation). * * This function serializes the event data into a JSON format. * It converts various data fields into a JSON object and writes this object to the file stream. * The function uses the nlohmann::json library for JSON serialization. * This function currently always returns 0. * The function constructs a JSON object containing relevant event information such as the hostgroup ID, thread ID, event type, username, schema name, client and server addresses, affected rows, last insert ID, rows sent, query string, timestamps, query digest, and client statement ID (if applicable). - * After constructing the JSON object, it serializes it into a string using the `dump()` method of the nlohmann::json library and writes the resulting string to the output file stream. + * After constructing the JSON object, it serializes it into a string using the `dump()` method of the nlohmann::json library and writes the resulting string to the LogBuffer. */ - uint64_t write_query_format_2_json(std::fstream* f); + uint64_t write_query_format_2_json(LogBuffer* f); /** - * @brief Writes authentication-related event data to a file stream. - * @param f A pointer to the file stream. + * @brief Writes authentication-related event data to a LogBuffer. + * @param f A pointer to the LogBuffer to write to. * @param sess A pointer to the MySQL_Session object. */ - void write_auth(std::fstream* f, MySQL_Session* sess); + void write_auth(LogBuffer* f, MySQL_Session* sess); /** * @brief Sets the client statement ID for the event. @@ -333,8 +333,10 @@ class MySQL_Logger { char* base_filename; ///< Base filename for event log files. Memory managed by the class. char* datadir; ///< Directory for event log files. Memory managed by the class. unsigned int log_file_id; ///< ID of the current event log file. + unsigned int current_log_size; ///< Current size of an event log file in bytes. unsigned int max_log_file_size; ///< Maximum size of an event log file in bytes. std::fstream* logfile; ///< File stream for event logging. + std::atomic logfile_open{false}; ///< Atomic flag indicating if the logfile is currently open. } events; /** @@ -345,8 +347,10 @@ class MySQL_Logger { char* base_filename; ///< Base filename for audit log files. Memory managed by the class. char* datadir; ///< Directory for audit log files. Memory managed by the class. unsigned int log_file_id; ///< ID of the current audit log file. + unsigned int current_log_size; ///< Current size of an audit log file in bytes. unsigned int max_log_file_size; ///< Maximum size of an audit log file in bytes. std::fstream* logfile; ///< File stream for audit logging. + std::atomic logfile_open{false}; ///< Atomic flag indicating if the logfile is currently open. } audit; /** @@ -395,6 +399,40 @@ class MySQL_Logger { rwlock_t rwlock; ///< rwlock for thread safety. #endif + // Map to store per-thread logging contexts (one context per thread handles both events and audit) + std::unordered_map> log_thread_contexts; + std::mutex log_thread_contexts_lock; ///< Mutex to protect the context map + + /** + * @brief Retrieves the logging context for the current thread. + * @return Pointer to the thread's LogBufferThreadContext containing both event and audit state. + */ + LogBufferThreadContext* get_log_thread_context(); + + /** + * @brief Checks if the events logfile is open. + * @return True if the logfile is open, false otherwise. + */ + bool is_events_logfile_open() const; + + /** + * @brief Sets the open state of the events logfile. + * @param open The new state (true for open, false for closed). + */ + void set_events_logfile_open(bool open); + + /** + * @brief Checks if the audit logfile is open. + * @return True if the logfile is open, false otherwise. + */ + bool is_audit_logfile_open() const; + + /** + * @brief Sets the open state of the audit logfile. + * @param open The new state (true for open, false for closed). + */ + void set_audit_logfile_open(bool open); + /** * @brief Closes the event log file. This function should only be called while holding the write lock. */ diff --git a/include/MySQL_Thread.h b/include/MySQL_Thread.h index 3d844dc6a9..c059b1690c 100644 --- a/include/MySQL_Thread.h +++ b/include/MySQL_Thread.h @@ -550,8 +550,13 @@ class MySQL_Threads_Handler int eventslog_default_log; int eventslog_format; int eventslog_stmt_parameters; + int eventslog_flush_timeout; + int eventslog_flush_size; + int eventslog_rate_limit; char *auditlog_filename; int auditlog_filesize; + int auditlog_flush_timeout; + int auditlog_flush_size; // SSL related, proxy to server char * ssl_p2s_ca; char * ssl_p2s_capath; diff --git a/include/PgSQL_Logger.hpp b/include/PgSQL_Logger.hpp index c3af60f864..18b3ec4236 100644 --- a/include/PgSQL_Logger.hpp +++ b/include/PgSQL_Logger.hpp @@ -2,6 +2,10 @@ #define __CLASS_PGSQL_LOGGER_H #include "proxysql.h" #include "cpp.h" +#include + +class LogBuffer; +class LogBufferThreadContext; #define PROXYSQL_LOGGER_PTHREAD_MUTEX @@ -56,10 +60,10 @@ class PgSQL_Event { public: PgSQL_Event(PGSQL_LOG_EVENT_TYPE _et, uint32_t _thread_id, char * _username, char * _schemaname , uint64_t _start_time , uint64_t _end_time , uint64_t _query_digest, char *_client, size_t _client_len); - uint64_t write(std::fstream *f, PgSQL_Session *sess); - uint64_t write_query_format_1(std::fstream *f); - uint64_t write_query_format_2_json(std::fstream *f); - void write_auth(std::fstream *f, PgSQL_Session *sess); + uint64_t write(LogBuffer *f, PgSQL_Session *sess); + uint64_t write_query_format_1(LogBuffer *f); + uint64_t write_query_format_2_json(LogBuffer *f); + void write_auth(LogBuffer *f, PgSQL_Session *sess); void set_client_stmt_name(char* client_stmt_name); void set_query(const char *ptr, int len); void set_server(int _hid, const char *ptr, int len); @@ -75,22 +79,34 @@ class PgSQL_Logger { char *base_filename; char *datadir; unsigned int log_file_id; + unsigned int current_log_size; unsigned int max_log_file_size; std::fstream *logfile; + std::atomic logfile_open{false}; } events; struct { bool enabled; char *base_filename; char *datadir; unsigned int log_file_id; + unsigned int current_log_size; unsigned int max_log_file_size; std::fstream *logfile; + std::atomic logfile_open{false}; } audit; #ifdef PROXYSQL_LOGGER_PTHREAD_MUTEX pthread_mutex_t wmutex; #else rwlock_t rwlock; #endif + std::unordered_map> log_thread_contexts; + std::mutex log_thread_contexts_lock; + + LogBufferThreadContext* get_log_thread_context(); + bool is_events_logfile_open() const; + void set_events_logfile_open(bool open); + bool is_audit_logfile_open() const; + void set_audit_logfile_open(bool open); void events_close_log_unlocked(); void events_open_log_unlocked(); void audit_close_log_unlocked(); diff --git a/include/PgSQL_Thread.h b/include/PgSQL_Thread.h index 333675f49c..475e8df5e2 100644 --- a/include/PgSQL_Thread.h +++ b/include/PgSQL_Thread.h @@ -1028,8 +1028,13 @@ class PgSQL_Threads_Handler int eventslog_filesize; int eventslog_default_log; int eventslog_format; + int eventslog_flush_timeout; + int eventslog_flush_size; + int eventslog_rate_limit; char* auditlog_filename; int auditlog_filesize; + int auditlog_flush_timeout; + int auditlog_flush_size; // SSL related, proxy to server char* ssl_p2s_ca; char* ssl_p2s_capath; diff --git a/include/log_utils.h b/include/log_utils.h new file mode 100644 index 0000000000..d06000395b --- /dev/null +++ b/include/log_utils.h @@ -0,0 +1,208 @@ +#ifndef __CLASS_LOG_UTILS_H +#define __CLASS_LOG_UTILS_H + +#include +#include +#include +#include +#include +#include +#include +#include + + +/** + * @brief Manages a string buffer and a flush timestamp for logging. + * + * Used to accumulate log data in memory before writing to a file. + */ +class LogBuffer { +private: + std::string buffer; + uint64_t last_flush_time; + +public: + LogBuffer(); + + /** + * @brief Stream insertion operator for appending strings to the buffer. + * @param value The string to append to the buffer. + * @return Reference to this LogBuffer for method chaining. + */ + LogBuffer& operator<<(const std::string& value); + + /** + * @brief Stream insertion operator for appending C-strings to the buffer. + * @param value The C-string to append to the buffer. + * @return Reference to this LogBuffer for method chaining. + */ + LogBuffer& operator<<(const char* value); + + /** + * @brief Stream insertion operator for appending characters to the buffer. + * @param value The character to append to the buffer. + * @return Reference to this LogBuffer for method chaining. + */ + LogBuffer& operator<<(char value); + + /** + * @brief Appends a string to the buffer. + * @param str The string to append to the buffer. + * @return Reference to this LogBuffer for method chaining. + */ + LogBuffer& append(const std::string& str); + + /** + * @brief Appends a C-string to the buffer. + * @param str The C-string to append to the buffer. + * @return Reference to this LogBuffer for method chaining. + */ + LogBuffer& append(const char* str); + + /** + * @brief Appends a character sequence to the buffer. + * @param str Pointer to the character sequence. + * @param len Length of the character sequence. + * @return Reference to this LogBuffer for method chaining. + */ + LogBuffer& append(const char* str, size_t len); + + /** + * @brief Writes a string to the buffer (alias for append). + * @param str The string to write to the buffer. + * @return Reference to this LogBuffer for method chaining. + */ + LogBuffer& write(const std::string& str); + + /** + * @brief Writes a character sequence to the buffer. + * @param str Pointer to the character sequence. + * @param len Length of the character sequence. + * @return Reference to this LogBuffer for method chaining. + */ + LogBuffer& write(const char* str, size_t len); + + /** + * @brief Resets the buffer for next use. + * + * Clears the buffer content and updates the last flush time. + * + * @param flush_time The timestamp at which the buffer was flushed. + */ + void reset(uint64_t flush_time); + + /** + * @brief Sets the last flush time explicitly. + * + * @param flush_time The timestamp to set. + */ + void set_last_flush_time(uint64_t flush_time); + + /** + * @brief Returns the last flush time. + */ + uint64_t get_last_flush_time() const; + + /** + * @brief Returns true if the buffer is empty. + */ + bool empty() const; + + /** + * @brief Returns the size of the buffer. + */ + size_t size() const; + + /** + * @brief Returns a pointer to the buffer data. + */ + const char* data() const; + + /** + * @brief Flushes the buffer to an output file stream. + * + * Writes the entire contents of the buffer to the provided file stream and resets the buffer. + * + * @param logfile The output file stream to write to. + */ + void flush_to_file(std::fstream* logfile); +}; + +/** + * @brief Class to hold per-thread logging context for both event and audit logging. + * + * This class encapsulates all per-thread state needed for logging operations: + * - events: Buffer and timestamp state for event logging + * - audit: Buffer and timestamp state for audit logging + * - rng: Thread-local Mersenne Twister random number generator for log sampling + * - dist: Uniform real distribution [0.0, 1.0) for sampling + * + * Each thread gets its own instance to avoid race conditions and lock contention. + */ +class LogBufferThreadContext { +private: + std::mt19937 rng; ///< Mersenne Twister random number generator (per-thread) + std::uniform_real_distribution dist; ///< Uniform distribution [0.0, 1.0) + +public: + LogBuffer events; ///< Event log buffer and timestamp + LogBuffer audit; ///< Audit log buffer and timestamp + + /** + * @brief Constructor that initializes the thread logging context with random seed. + * + * Seeds the RNG using a combination of hardware entropy, high-resolution time, + * and thread ID for maximum uniqueness and unpredictability. + */ + LogBufferThreadContext(); + + /** + * @brief Determines if an event should be logged based on the rate limit. + * + * Calculates whether a random sample falls within the acceptance threshold + * determined by the rate limit. + * + * @param rate_limit The sampling rate limit. 1 means log all, N means log approx 1/N. + * @return true if the event should be logged, false otherwise. + */ + bool should_log(int rate_limit); +}; + +/** + * @brief Retrieves or creates the LogBufferThreadContext for the current thread. + * + * This helper function checks if a context for the current thread exists in the provided map. + * If found, it returns the existing context. Otherwise, it creates a new one, initializes + * it with the current time, and adds it to the map. + * + * @param log_thread_contexts The map of thread contexts. + * @param log_thread_contexts_lock The mutex protecting access to the map. + * @param current_time The current time to initialize last_flush_time (e.g. monotonic_time()). + * @return LogBufferThreadContext* Pointer to the thread's logging context. + */ +LogBufferThreadContext* GetLogBufferThreadContext(std::unordered_map>& log_thread_contexts, std::mutex& log_thread_contexts_lock, uint64_t current_time); + +/** + * @brief Helper function to flush buffer to file stream and rotate file if required. + * + * @param buffer The LogBuffer to flush + * @param logfile Pointer to the logfile stream + * @param current_log_size Reference to current log size counter + * @param max_log_file_size Maximum log file size before rotation + * @param lock_fn Function to acquire write lock + * @param unlock_fn Function to release write lock + * @param rotate_fn Function to rotate the log file + * @param reset_time Timestamp to use when resetting the buffer after flush + * @return true if buffer was flushed, false otherwise + */ +bool flush_and_rotate( + LogBuffer& buffer, + std::fstream* logfile, + unsigned int& current_log_size, + unsigned int max_log_file_size, + std::function lock_fn, + std::function unlock_fn, + std::function rotate_fn, + uint64_t reset_time = 0); + +#endif /* __CLASS_LOG_UTILS_H */ diff --git a/include/proxysql_structs.h b/include/proxysql_structs.h index 0efb79576d..d9dfc6e672 100644 --- a/include/proxysql_structs.h +++ b/include/proxysql_structs.h @@ -1172,10 +1172,15 @@ __thread int pgsql_thread___query_digests_groups_grouping_limit; __thread char* pgsql_thread___auditlog_filename; __thread int pgsql_thread___auditlog_filesize; +__thread int pgsql_thread___auditlog_flush_timeout; +__thread int pgsql_thread___auditlog_flush_size; __thread char* pgsql_thread___eventslog_filename; __thread int pgsql_thread___eventslog_filesize; __thread int pgsql_thread___eventslog_default_log; __thread int pgsql_thread___eventslog_format; +__thread int pgsql_thread___eventslog_flush_timeout; +__thread int pgsql_thread___eventslog_flush_size; +__thread int pgsql_thread___eventslog_rate_limit; __thread char* pgsql_thread___firewall_whitelist_errormsg; __thread bool pgsql_thread___firewall_whitelist_enabled; __thread int pgsql_thread___query_processor_iterations; @@ -1333,10 +1338,15 @@ __thread int mysql_thread___eventslog_buffer_max_query_length; __thread int mysql_thread___eventslog_default_log; __thread int mysql_thread___eventslog_format; __thread int mysql_thread___eventslog_stmt_parameters; +__thread int mysql_thread___eventslog_flush_timeout; +__thread int mysql_thread___eventslog_flush_size; +__thread int mysql_thread___eventslog_rate_limit; /* variables used by audit log */ __thread char * mysql_thread___auditlog_filename; __thread int mysql_thread___auditlog_filesize; +__thread int mysql_thread___auditlog_flush_timeout; +__thread int mysql_thread___auditlog_flush_size; /* variables used by the monitoring module */ __thread int mysql_thread___monitor_enabled; @@ -1476,10 +1486,15 @@ extern __thread int pgsql_thread___query_digests_groups_grouping_limit; extern __thread char* pgsql_thread___auditlog_filename; extern __thread int pgsql_thread___auditlog_filesize; +extern __thread int pgsql_thread___auditlog_flush_timeout; +extern __thread int pgsql_thread___auditlog_flush_size; extern __thread char* pgsql_thread___eventslog_filename; extern __thread int pgsql_thread___eventslog_filesize; extern __thread int pgsql_thread___eventslog_default_log; extern __thread int pgsql_thread___eventslog_format; +extern __thread int pgsql_thread___eventslog_flush_timeout; +extern __thread int pgsql_thread___eventslog_flush_size; +extern __thread int pgsql_thread___eventslog_rate_limit; extern __thread char* pgsql_thread___firewall_whitelist_errormsg; extern __thread bool pgsql_thread___firewall_whitelist_enabled; extern __thread int pgsql_thread___query_processor_iterations; @@ -1637,10 +1652,15 @@ extern __thread int mysql_thread___eventslog_buffer_max_query_length; extern __thread int mysql_thread___eventslog_default_log; extern __thread int mysql_thread___eventslog_format; extern __thread int mysql_thread___eventslog_stmt_parameters; +extern __thread int mysql_thread___eventslog_flush_timeout; +extern __thread int mysql_thread___eventslog_flush_size; +extern __thread int mysql_thread___eventslog_rate_limit; /* variables used by audit log */ extern __thread char * mysql_thread___auditlog_filename; extern __thread int mysql_thread___auditlog_filesize; +extern __thread int mysql_thread___auditlog_flush_timeout; +extern __thread int mysql_thread___auditlog_flush_size; /* variables used by the monitoring module */ extern __thread int mysql_thread___monitor_enabled; diff --git a/lib/Makefile b/lib/Makefile index 6dcce9f395..c573318db8 100644 --- a/lib/Makefile +++ b/lib/Makefile @@ -68,7 +68,7 @@ MYCXXFLAGS := $(STDCPP) $(MYCFLAGS) $(PSQLCH) $(PSQLGA) $(ENABLE_EPOLL) default: libproxysql.a .PHONY: default -_OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo MySQL_Query_Processor.oo PgSQL_Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo MySQL_Variables.oo c_tokenizer.oo proxysql_utils.oo proxysql_coredump.oo proxysql_sslkeylog.oo \ +_OBJ_CXX := ProxySQL_GloVars.oo network.oo debug.oo configfile.oo Query_Cache.oo SpookyV2.oo MySQL_Authentication.oo gen_utils.oo sqlite3db.oo mysql_connection.oo MySQL_HostGroups_Manager.oo mysql_data_stream.oo MySQL_Thread.oo MySQL_Session.oo MySQL_Protocol.oo mysql_backend.oo Query_Processor.oo MySQL_Query_Processor.oo PgSQL_Query_Processor.oo ProxySQL_Admin.oo ProxySQL_Config.oo ProxySQL_Restapi.oo MySQL_Monitor.oo MySQL_Logger.oo log_utils.oo thread.oo MySQL_PreparedStatement.oo ProxySQL_Cluster.oo ClickHouse_Authentication.oo ClickHouse_Server.oo ProxySQL_Statistics.oo Chart_bundle_js.oo ProxySQL_HTTP_Server.oo ProxySQL_RESTAPI_Server.oo font-awesome.min.css.oo main-bundle.min.css.oo MySQL_Variables.oo c_tokenizer.oo proxysql_utils.oo proxysql_coredump.oo proxysql_sslkeylog.oo \ sha256crypt.oo \ BaseSrvList.oo BaseHGC.oo Base_HostGroups_Manager.oo \ QP_rule_text.oo QP_query_digest_stats.oo \ diff --git a/lib/MySQL_Logger.cpp b/lib/MySQL_Logger.cpp index 1ac28a6143..0e96150c45 100644 --- a/lib/MySQL_Logger.cpp +++ b/lib/MySQL_Logger.cpp @@ -5,12 +5,12 @@ using json = nlohmann::json; #include #include "proxysql.h" #include "cpp.h" -#include #include "MySQL_Data_Stream.h" #include "MySQL_Query_Processor.h" #include "MySQL_PreparedStatement.h" #include "MySQL_Logger.hpp" +#include "log_utils.h" #include #include @@ -497,7 +497,7 @@ void MySQL_Event::set_server(int _hid, const char *ptr, int len) { hid=_hid; } -uint64_t MySQL_Event::write(std::fstream *f, MySQL_Session *sess) { +uint64_t MySQL_Event::write(LogBuffer *f, MySQL_Session *sess) { uint64_t total_bytes=0; switch (et) { case PROXYSQL_COM_QUERY: @@ -535,17 +535,20 @@ uint64_t MySQL_Event::write(std::fstream *f, MySQL_Session *sess) { return total_bytes; } -void MySQL_Event::write_auth(std::fstream *f, MySQL_Session *sess) { +void MySQL_Event::write_auth(LogBuffer *f, MySQL_Session *sess) { json j = {}; j["timestamp"] = start_time/1000; { time_t timer=start_time/1000/1000; - struct tm* tm_info; - tm_info = localtime(&timer); + struct tm tm_info; char buffer1[36]; char buffer2[64]; - strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", tm_info); - sprintf(buffer2,"%s.%03u", buffer1, (unsigned)(start_time%1000000)/1000); + if (localtime_r(&timer, &tm_info)) { + strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", &tm_info); + sprintf(buffer2,"%s.%03u", buffer1, (unsigned)(start_time%1000000)/1000); + } else { + snprintf(buffer2, sizeof(buffer2), "invalid_date"); + } j["time"] = buffer2; } j["thread_id"] = thread_id; @@ -623,12 +626,15 @@ void MySQL_Event::write_auth(std::fstream *f, MySQL_Session *sess) { uint64_t timediff = curtime_mono - sess->start_time; uint64_t orig_time = curtime_real - timediff; time_t timer= (orig_time)/1000/1000; - struct tm* tm_info; - tm_info = localtime(&timer); + struct tm tm_info; char buffer1[36]; char buffer2[64]; - strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", tm_info); - sprintf(buffer2,"%s.%03u", buffer1, (unsigned)(orig_time%1000000)/1000); + if (localtime_r(&timer, &tm_info)) { + strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", &tm_info); + sprintf(buffer2,"%s.%03u", buffer1, (unsigned)(orig_time%1000000)/1000); + } else { + snprintf(buffer2, sizeof(buffer2), "invalid_date"); + } j["creation_time"] = buffer2; //unsigned long long life = sess->thread->curtime - sess->start_time; //life/=1000; @@ -653,11 +659,11 @@ void MySQL_Event::write_auth(std::fstream *f, MySQL_Session *sess) { // right before the write to disk //GloMyLogger->wrlock(); //move wrlock() function to log_audit_entry() function, avoid to get a null pointer in a multithreaded environment - *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << std::endl; + *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << '\n'; } /** - * @brief Writes a query event to the given file stream in a specifically encoded format. + * @brief Writes a query event to the given LogBuffer in a specifically encoded format. * * This function assembles and writes a MySQL query event record to the provided std::fstream. * It computes the total byte size of the record by encoding various fields (such as thread ID, username, schema name, @@ -695,25 +701,25 @@ void MySQL_Event::write_auth(std::fstream *f, MySQL_Session *sess) { * minimizing the duration for which the resource is locked. * - The function depends on external helper routines: * • mysql_encode_length: to determine the number of bytes required to store an integer in a variable-length format. - * • write_encoded_length: to actually write the encoded lengths to the buffer and then to the file stream. + * • write_encoded_length: to actually write the encoded lengths to the buffer and then to the LogBuffer. * • getValueForBind: to convert parameter data bound to a query into a string representation. * - * @param f[in,out] Pointer to a std::fstream object representing the file stream to write the query event record. + * @param f[in,out] Pointer to a LogBuffer object to write the query event record. * - * @return Returns the total size in bytes (as a uint64_t) that was written to the file stream. + * @return Returns the total size in bytes (as a uint64_t) that was written to the LogBuffer. * * @note The function assumes that the session and associated metadata for prepared statements (stmt_meta) are valid and * available when processing a COM_STMT_EXECUTE event. * - * @warning Ensure that the passed file stream pointer is valid and opened for writing, as no internal checks on the stream's state - * are performed. + * @warning Ensure that the passed LogBuffer pointer is valid and events.logfile is opened for writing, as no internal checks + * on the stream's state * * @see mysql_encode_length, write_encoded_length, getValueForBind, stmt_execute_metadata_t, MYSQL_BIND * * Returns: * The total number of bytes written for the event log. */ -uint64_t MySQL_Event::write_query_format_1(std::fstream *f) { +uint64_t MySQL_Event::write_query_format_1(LogBuffer *f) { uint64_t total_bytes=0; total_bytes+=1; // et total_bytes+=mysql_encode_length(thread_id, NULL); @@ -900,7 +906,7 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) { // - Calculates the required bitmap size as (num_params + 7) / 8 bytes where each bit represents // whether a parameter value is null. // - Iterates over each parameter, setting the corresponding bit in the bitmap if the parameter is null. - // - Writes the complete null bitmap to the file stream. + // - Writes the complete null bitmap to the LogBuffer. size_t bitmap_size = (num_params + 7) / 8; // one bit per parameter std::vector null_bitmap(bitmap_size, 0); for (uint16_t i = 0; i < num_params; i++) { @@ -916,7 +922,7 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) { for (uint16_t i = 0; i < num_params; i++) { // - Writes the parameter type: // * Retrieves a 2-byte parameter type from the MYSQL_BIND structure associated with the current parameter. - // * Writes these 2 bytes directly to the file stream. + // * Writes these 2 bytes directly to the LogBuffer. const MYSQL_BIND *bind = meta->binds ? &meta->binds[i] : nullptr; uint16_t param_type = (bind ? bind->buffer_type : 0); // Write parameter type (2 bytes). @@ -971,11 +977,11 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) { } /** - * @brief Writes the MySQL event details in JSON format to a file stream. + * @brief Writes the MySQL event details in JSON format to LogBuffer. * * This method generates a JSON object containing various details about the MySQL event, * including query information, timestamps, execution metadata, error information, and - * performance metrics. The resulting JSON string is then written to the provided file stream. + * performance metrics. The resulting JSON string is then written to the provided LogBuffer. * * The JSON object includes the following keys (when applicable): * - "hostgroup_id": The hostgroup identifier if set; otherwise, it defaults to -1. @@ -1003,13 +1009,13 @@ uint64_t MySQL_Event::write_query_format_1(std::fstream *f) { * Additionally, for executed statements (PROXYSQL_COM_STMT_EXECUTE), if session data is available, * prepared statement parameters and added to the JSON. * - * The generated JSON is dumped into the given file stream with error replacement settings to ensure + * The generated JSON is dumped into the given LogBuffer with error replacement settings to ensure * proper serialization even in the presence of encoding errors. * - * @param[out] f Pointer to a std::fstream where the JSON string will be written. + * @param[out] f Pointer to a LogBuffer where the JSON string will be written. * @return uint64_t Always returns 0, as the current implementation does not compute total bytes written. */ -uint64_t MySQL_Event::write_query_format_2_json(std::fstream *f) { +uint64_t MySQL_Event::write_query_format_2_json(LogBuffer *f) { json j = {}; uint64_t total_bytes=0; if (hid!=UINT64_MAX) { @@ -1073,23 +1079,29 @@ uint64_t MySQL_Event::write_query_format_2_json(std::fstream *f) { j["starttime_timestamp_us"] = start_time; { time_t timer=start_time/1000/1000; - struct tm* tm_info; - tm_info = localtime(&timer); + struct tm tm_info; char buffer1[36]; char buffer2[64]; - strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", tm_info); - sprintf(buffer2,"%s.%06u", buffer1, (unsigned)(start_time%1000000)); + if (localtime_r(&timer, &tm_info)) { + strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", &tm_info); + sprintf(buffer2,"%s.%06u", buffer1, (unsigned)(start_time%1000000)); + } else { + snprintf(buffer2, sizeof(buffer2), "invalid_date"); + } j["starttime"] = buffer2; } j["endtime_timestamp_us"] = end_time; { time_t timer=end_time/1000/1000; - struct tm* tm_info; - tm_info = localtime(&timer); + struct tm tm_info; char buffer1[36]; char buffer2[64]; - strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", tm_info); - sprintf(buffer2,"%s.%06u", buffer1, (unsigned)(end_time%1000000)); + if (localtime_r(&timer, &tm_info)) { + strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", &tm_info); + sprintf(buffer2,"%s.%06u", buffer1, (unsigned)(end_time%1000000)); + } else { + snprintf(buffer2, sizeof(buffer2), "invalid_date"); + } j["endtime"] = buffer2; } j["duration_us"] = end_time-start_time; @@ -1113,7 +1125,7 @@ uint64_t MySQL_Event::write_query_format_2_json(std::fstream *f) { //GloMyLogger->wrlock(); //move wrlock() function to log_request() function, avoid to get a null pointer in a multithreaded environment - *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << std::endl; + *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << '\n'; return total_bytes; // always 0 } @@ -1179,9 +1191,11 @@ MySQL_Logger::MySQL_Logger() : metrics{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} { #endif events.logfile=NULL; events.log_file_id=0; + events.current_log_size=0; events.max_log_file_size=100*1024*1024; audit.logfile=NULL; audit.log_file_id=0; + audit.current_log_size=0; audit.max_log_file_size=100*1024*1024; MyLogCB = new MySQL_Logger_CircularBuffer(0); @@ -1195,6 +1209,27 @@ MySQL_Logger::MySQL_Logger() : metrics{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0} { }; MySQL_Logger::~MySQL_Logger() { + // Flush all per-thread buffers before destroying the logger + { + std::lock_guard lock(log_thread_contexts_lock); + for (const auto& kv : log_thread_contexts) { + LogBufferThreadContext* log_ctx = kv.second.get(); + if (!log_ctx->events.empty()) { + flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr + ); + } + if (!log_ctx->audit.empty()) { + flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr + ); + } + } + } if (events.datadir) { free(events.datadir); } @@ -1230,6 +1265,21 @@ void MySQL_Logger::flush_log() { wrunlock(); } +bool MySQL_Logger::is_events_logfile_open() const { + return events.logfile_open.load(); +} + +void MySQL_Logger::set_events_logfile_open(bool open) { + events.logfile_open.store(open); +} + +bool MySQL_Logger::is_audit_logfile_open() const { + return audit.logfile_open.load(); +} + +void MySQL_Logger::set_audit_logfile_open(bool open) { + audit.logfile_open.store(open); +} void MySQL_Logger::events_close_log_unlocked() { if (events.logfile) { @@ -1280,9 +1330,12 @@ void MySQL_Logger::events_open_log_unlocked() { events.logfile->exceptions ( std::ofstream::failbit | std::ofstream::badbit ); try { events.logfile->open(filen , std::ios::out | std::ios::binary); + events.current_log_size = 0; + set_events_logfile_open(true); proxy_info("Starting new mysql event log file %s\n", filen); if (mysql_thread___eventslog_format == 1) { // create a new event, type PROXYSQL_METADATA, that writes the ProxySQL version as part of the payload + LogBufferThreadContext *log_ctx = get_log_thread_context(); json j = {}; j["version"] = string(PROXYSQL_VERSION); string msg = j.dump(); @@ -1299,13 +1352,17 @@ void MySQL_Logger::events_open_log_unlocked() { nullptr // no session associated ); metaEvent.set_query((char *)"",0); - metaEvent.write(events.logfile, nullptr); + metaEvent.write(&log_ctx->events, nullptr); + log_ctx->events.flush_to_file(events.logfile); + events.current_log_size += log_ctx->events.size(); } } catch (const std::ofstream::failure&) { proxy_error("Error creating new mysql event log file %s\n", filen); delete events.logfile; events.logfile=NULL; + events.current_log_size = 0; + set_events_logfile_open(false); } free(filen); }; @@ -1329,12 +1386,16 @@ void MySQL_Logger::audit_open_log_unlocked() { audit.logfile->exceptions ( std::ofstream::failbit | std::ofstream::badbit ); try { audit.logfile->open(filen , std::ios::out | std::ios::binary); + audit.current_log_size = 0; + set_audit_logfile_open(true); proxy_info("Starting new audit log file %s\n", filen); } catch (const std::ofstream::failure&) { proxy_error("Error creating new audit log file %s\n", filen); delete audit.logfile; audit.logfile=NULL; + audit.current_log_size=0; + set_audit_logfile_open(false); } free(filen); }; @@ -1403,12 +1464,20 @@ void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds, con int elmhs = mysql_thread___eventslog_buffer_history_size; if (elmhs == 0) { if (events.enabled==false) return; - if (events.logfile==NULL) return; + if (!is_events_logfile_open()) return; } // 'MySQL_Session::client_myds' could be NULL in case of 'RequestEnd' being called over a freshly created session // due to a failed 'CONNECTION_RESET'. Because this scenario isn't a client request, we just return. if (sess->client_myds==NULL || sess->client_myds->myconn== NULL) return; + // Obtain current thread's log ctx + LogBufferThreadContext* log_ctx = get_log_thread_context(); + + // Sample event logs. Set mysql-eventslog_rate_limit=1 (default) to log all events + if (mysql_thread___eventslog_rate_limit > 1) + if (!log_ctx->should_log(mysql_thread___eventslog_rate_limit)) + return; + MySQL_Connection_userinfo *ui=sess->client_myds->myconn->userinfo; uint64_t curtime_real=realtime_time(); @@ -1523,21 +1592,20 @@ void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds, con // for performance reason, we are moving the write lock // right before the write to disk //wrlock(); - - - if ((events.enabled == true) && (events.logfile != nullptr)) { - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - GloMyLogger->wrlock(); - me.write(events.logfile, sess); + if (is_events_logfile_open()) { + me.write(&log_ctx->events, sess); + if (log_ctx->events.size() > static_cast(mysql_thread___eventslog_flush_size)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump + flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { events_flush_log_unlocked(); }, + monotonic_time() + ); + } + } - - unsigned long curpos=events.logfile->tellp(); - if (curpos > events.max_log_file_size) { - events_flush_log_unlocked(); - } - wrunlock(); - } if (MyLogCB->buffer_size != 0) { MySQL_Event *me2 = new MySQL_Event(me); MyLogCB->insert(me2); @@ -1559,11 +1627,14 @@ void MySQL_Logger::log_request(MySQL_Session *sess, MySQL_Data_Stream *myds, con void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQL_Data_Stream *myds, char *xi) { if (audit.enabled==false) return; - if (audit.logfile==NULL) return; + if (!is_audit_logfile_open()) return; if (sess == NULL) return; if (sess->client_myds == NULL) return; + // Obtain current thread's log ctx + LogBufferThreadContext* log_ctx = get_log_thread_context(); + MySQL_Connection_userinfo *ui= NULL; if (sess) { if (sess->client_myds) { @@ -1686,16 +1757,18 @@ void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQ // right before the write to disk //wrlock(); - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - GloMyLogger->wrlock(); - me.write(audit.logfile, sess); - - - unsigned long curpos=audit.logfile->tellp(); - if (curpos > audit.max_log_file_size) { - audit_flush_log_unlocked(); - } - wrunlock(); + if (is_audit_logfile_open()) { + me.write(&log_ctx->audit, sess); + if (log_ctx->audit.size() > static_cast(mysql_thread___auditlog_flush_size)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump + flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { audit_flush_log_unlocked(); }, + monotonic_time() + ); + } + } if (cl && sess->client_myds->addr.port) { free(ca); @@ -1706,14 +1779,42 @@ void MySQL_Logger::log_audit_entry(log_event_type _et, MySQL_Session *sess, MySQ } void MySQL_Logger::flush() { - wrlock(); - if (events.logfile) { - events.logfile->flush(); - } - if (audit.logfile) { - audit.logfile->flush(); - } - wrunlock(); + LogBufferThreadContext* log_ctx = get_log_thread_context(); + const uint64_t current_time = monotonic_time(); + + // eventslog + if (is_events_logfile_open()) { + if (log_ctx->events.size() > 0 && + (current_time - log_ctx->events.get_last_flush_time()) > static_cast(mysql_thread___eventslog_flush_timeout) * 1000) { + flush_and_rotate( + log_ctx->events, + events.logfile, + events.current_log_size, + events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { events_flush_log_unlocked(); }, + current_time + ); + } + } + + // auditlogs + if (is_audit_logfile_open()) { + if (log_ctx->audit.size() > 0 && + (current_time - log_ctx->audit.get_last_flush_time()) > static_cast(mysql_thread___auditlog_flush_timeout) * 1000) { + flush_and_rotate( + log_ctx->audit, + audit.logfile, + audit.current_log_size, + audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { audit_flush_log_unlocked(); }, + current_time + ); + } + } } unsigned int MySQL_Logger::events_find_next_id() { @@ -2060,3 +2161,7 @@ void MySQL_Logger::p_update_metrics() { const auto& gauges { this->prom_metrics.p_gauge_array }; gauges[ml_g::circular_buffer_events_size]->Set(MyLogCB->size()); } + +LogBufferThreadContext* MySQL_Logger::get_log_thread_context() { + return GetLogBufferThreadContext(log_thread_contexts, log_thread_contexts_lock, monotonic_time()); +} diff --git a/lib/MySQL_Thread.cpp b/lib/MySQL_Thread.cpp index 0427b26173..dcba23db97 100644 --- a/lib/MySQL_Thread.cpp +++ b/lib/MySQL_Thread.cpp @@ -365,8 +365,13 @@ static char * mysql_thread_variables_names[]= { (char *)"eventslog_default_log", (char *)"eventslog_format", (char *)"eventslog_stmt_parameters", + (char *)"eventslog_flush_timeout", + (char *)"eventslog_flush_size", + (char *)"eventslog_rate_limit", (char *)"auditlog_filename", (char *)"auditlog_filesize", + (char *)"auditlog_flush_timeout", + (char *)"auditlog_flush_size", //(char *)"default_charset", // removed in 2.0.13 . Obsoleted previously using MySQL_Variables instead (char *)"handle_unknown_charset", (char *)"free_connections_pct", @@ -1130,8 +1135,13 @@ MySQL_Threads_Handler::MySQL_Threads_Handler() { variables.eventslog_default_log=0; variables.eventslog_format=1; variables.eventslog_stmt_parameters=0; + variables.eventslog_flush_timeout=1000; + variables.eventslog_flush_size=4096; + variables.eventslog_rate_limit=1; variables.auditlog_filename=strdup((char *)""); variables.auditlog_filesize=100*1024*1024; + variables.auditlog_flush_timeout=1000; + variables.auditlog_flush_size=4096; //variables.server_capabilities=CLIENT_FOUND_ROWS | CLIENT_PROTOCOL_41 | CLIENT_IGNORE_SIGPIPE | CLIENT_TRANSACTIONS | CLIENT_SECURE_CONNECTION | CLIENT_CONNECT_WITH_DB; // major upgrade in 2.0.0 variables.server_capabilities = CLIENT_MYSQL | CLIENT_FOUND_ROWS | CLIENT_PROTOCOL_41 | CLIENT_IGNORE_SIGPIPE | CLIENT_TRANSACTIONS | CLIENT_SECURE_CONNECTION | CLIENT_CONNECT_WITH_DB | CLIENT_PLUGIN_AUTH;; @@ -1802,6 +1812,8 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi // if we are switching format, we need to switch file too if (GloMyLogger) { proxy_info("Switching query logging format from %d to %d\n", variables.eventslog_format , intv); + // write existing logs (if any) to file before switching the file + GloMyLogger->flush(); GloMyLogger->flush_log(); } variables.eventslog_format=intv; @@ -1811,6 +1823,63 @@ bool MySQL_Threads_Handler::set_variable(char *name, const char *value) { // thi return false; } } + if (!strcasecmp(name,"eventslog_flush_timeout")) { + int intv=atoi(value); + if (intv >= 0) { + variables.eventslog_flush_timeout=intv; + if (intv > 5 * 60 * 1000) { + proxy_warning("mysql-eventslog_flush_timeout is set to a high value: %dms\n", intv); + } + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"eventslog_flush_size")) { + int intv=atoi(value); + if (intv >= 0) { + variables.eventslog_flush_size=intv; + if (intv > 10 * 1024 * 1024) { + proxy_warning("mysql-eventslog_flush_size is set to a high value: %d\n", intv); + } + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"eventslog_rate_limit")) { + int intv=atoi(value); + if (intv >= 1) { + variables.eventslog_rate_limit=intv; + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"auditlog_flush_timeout")) { + int intv=atoi(value); + if (intv >= 0) { + variables.auditlog_flush_timeout=intv; + if (intv > 5 * 60 * 1000) { + proxy_warning("mysql-auditlog_flush_timeout is set to a high value: %dms\n", intv); + } + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"auditlog_flush_size")) { + int intv=atoi(value); + if (intv >= 0) { + variables.auditlog_flush_size=intv; + if (intv > 10 * 1024 * 1024) { + proxy_warning("mysql-auditlog_flush_size is set to a high value: %d\n", intv); + } + return true; + } else { + return false; + } + } if (!strcasecmp(name,"default_schema")) { if (vallen) { free(variables.default_schema); @@ -2372,6 +2441,11 @@ char ** MySQL_Threads_Handler::get_variables_list() { // the input validation for these variables MUST be EXPLICIT VariablesPointers_int["binlog_reader_connect_retry_msec"] = make_tuple(&variables.binlog_reader_connect_retry_msec, 0, 0, true); VariablesPointers_int["eventslog_format"] = make_tuple(&variables.eventslog_format, 0, 0, true); + VariablesPointers_int["eventslog_flush_timeout"] = make_tuple(&variables.eventslog_flush_timeout, 0, 0, true); + VariablesPointers_int["eventslog_flush_size"] = make_tuple(&variables.eventslog_flush_size, 0, 0, true); + VariablesPointers_int["eventslog_rate_limit"] = make_tuple(&variables.eventslog_rate_limit, 0, 0, true); + VariablesPointers_int["auditlog_flush_timeout"] = make_tuple(&variables.auditlog_flush_timeout, 0, 0, true); + VariablesPointers_int["auditlog_flush_size"] = make_tuple(&variables.auditlog_flush_size, 0, 0, true); VariablesPointers_int["wait_timeout"] = make_tuple(&variables.wait_timeout, 0, 0, true); VariablesPointers_int["select_version_forwarding"] = make_tuple(&variables.select_version_forwarding, 0, 3, false); VariablesPointers_int["data_packets_history_size"] = make_tuple(&variables.data_packets_history_size, 0, 0, true); @@ -4362,9 +4436,14 @@ void MySQL_Thread::refresh_variables() { REFRESH_VARIABLE_INT(eventslog_default_log); REFRESH_VARIABLE_INT(eventslog_format); REFRESH_VARIABLE_INT(eventslog_stmt_parameters); + REFRESH_VARIABLE_INT(eventslog_flush_timeout); + REFRESH_VARIABLE_INT(eventslog_flush_size); + REFRESH_VARIABLE_INT(eventslog_rate_limit); REFRESH_VARIABLE_CHAR(eventslog_filename); REFRESH_VARIABLE_INT(auditlog_filesize); REFRESH_VARIABLE_CHAR(auditlog_filename); + REFRESH_VARIABLE_INT(auditlog_flush_timeout); + REFRESH_VARIABLE_INT(auditlog_flush_size); GloMyLogger->events_set_base_filename(); // both filename and filesize are set here GloMyLogger->audit_set_base_filename(); // both filename and filesize are set here REFRESH_VARIABLE_CHAR(default_schema); diff --git a/lib/PgSQL_Logger.cpp b/lib/PgSQL_Logger.cpp index e279059c4e..426bc659da 100644 --- a/lib/PgSQL_Logger.cpp +++ b/lib/PgSQL_Logger.cpp @@ -10,6 +10,7 @@ using json = nlohmann::json; #include "PgSQL_Query_Processor.h" #include "PgSQL_PreparedStatement.h" #include "PgSQL_Logger.hpp" +#include "log_utils.h" #include #include @@ -98,7 +99,7 @@ void PgSQL_Event::set_server(int _hid, const char *ptr, int len) { hid=_hid; } -uint64_t PgSQL_Event::write(std::fstream *f, PgSQL_Session *sess) { +uint64_t PgSQL_Event::write(LogBuffer *f, PgSQL_Session *sess) { uint64_t total_bytes=0; switch (et) { case PGSQL_LOG_EVENT_TYPE::SIMPLE_QUERY: @@ -132,17 +133,20 @@ uint64_t PgSQL_Event::write(std::fstream *f, PgSQL_Session *sess) { return total_bytes; } -void PgSQL_Event::write_auth(std::fstream *f, PgSQL_Session *sess) { +void PgSQL_Event::write_auth(LogBuffer *f, PgSQL_Session *sess) { json j = {}; j["timestamp"] = start_time/1000; { time_t timer=start_time/1000/1000; - struct tm* tm_info; - tm_info = localtime(&timer); + struct tm tm_info; char buffer1[36]; char buffer2[64]; - strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", tm_info); - sprintf(buffer2,"%s.%03u", buffer1, (unsigned)(start_time%1000000)/1000); + if (localtime_r(&timer, &tm_info)) { + strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", &tm_info); + sprintf(buffer2,"%s.%03u", buffer1, (unsigned)(start_time%1000000)/1000); + } else { + snprintf(buffer2, sizeof(buffer2), "invalid_date"); + } j["time"] = buffer2; } j["thread_id"] = thread_id; @@ -220,12 +224,15 @@ void PgSQL_Event::write_auth(std::fstream *f, PgSQL_Session *sess) { uint64_t timediff = curtime_mono - sess->start_time; uint64_t orig_time = curtime_real - timediff; time_t timer= (orig_time)/1000/1000; - struct tm* tm_info; - tm_info = localtime(&timer); + struct tm tm_info; char buffer1[36]; char buffer2[64]; - strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", tm_info); - sprintf(buffer2,"%s.%03u", buffer1, (unsigned)(orig_time%1000000)/1000); + if (localtime_r(&timer, &tm_info)) { + strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", &tm_info); + sprintf(buffer2,"%s.%03u", buffer1, (unsigned)(orig_time%1000000)/1000); + } else { + snprintf(buffer2, sizeof(buffer2), "invalid_date"); + } j["creation_time"] = buffer2; //unsigned long long life = sess->thread->curtime - sess->start_time; //life/=1000; @@ -250,10 +257,10 @@ void PgSQL_Event::write_auth(std::fstream *f, PgSQL_Session *sess) { // right before the write to disk //GloPgSQL_Logger->wrlock(); //move wrlock() function to log_audit_entry() function, avoid to get a null pointer in a multithreaded environment - *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << std::endl; + *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << '\n'; } -uint64_t PgSQL_Event::write_query_format_1(std::fstream *f) { +uint64_t PgSQL_Event::write_query_format_1(LogBuffer *f) { uint64_t total_bytes=0; total_bytes+=1; // et total_bytes+=encode_length(thread_id, NULL); @@ -359,7 +366,7 @@ uint64_t PgSQL_Event::write_query_format_1(std::fstream *f) { return total_bytes; } -uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) { +uint64_t PgSQL_Event::write_query_format_2_json(LogBuffer *f) { json j = {}; uint64_t total_bytes=0; if (hid!=UINT64_MAX) { @@ -410,23 +417,29 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) { j["starttime_timestamp_us"] = start_time; { time_t timer=start_time/1000/1000; - struct tm* tm_info; - tm_info = localtime(&timer); + struct tm tm_info; char buffer1[36]; char buffer2[64]; - strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", tm_info); - sprintf(buffer2,"%s.%06u", buffer1, (unsigned)(start_time%1000000)); + if (localtime_r(&timer, &tm_info)) { + strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", &tm_info); + sprintf(buffer2,"%s.%06u", buffer1, (unsigned)(start_time%1000000)); + } else { + snprintf(buffer2, sizeof(buffer2), "invalid_date"); + } j["starttime"] = buffer2; } j["endtime_timestamp_us"] = end_time; { time_t timer=end_time/1000/1000; - struct tm* tm_info; - tm_info = localtime(&timer); + struct tm tm_info; char buffer1[36]; char buffer2[64]; - strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", tm_info); - sprintf(buffer2,"%s.%06u", buffer1, (unsigned)(end_time%1000000)); + if (localtime_r(&timer, &tm_info)) { + strftime(buffer1, 32, "%Y-%m-%d %H:%M:%S", &tm_info); + sprintf(buffer2,"%s.%06u", buffer1, (unsigned)(end_time%1000000)); + } else { + snprintf(buffer2, sizeof(buffer2), "invalid_date"); + } j["endtime"] = buffer2; } j["duration_us"] = end_time-start_time; @@ -445,7 +458,7 @@ uint64_t PgSQL_Event::write_query_format_2_json(std::fstream *f) { //GloPgSQL_Logger->wrlock(); //move wrlock() function to log_request() function, avoid to get a null pointer in a multithreaded environment - *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << std::endl; + *f << j.dump(-1, ' ', false, json::error_handler_t::replace) << '\n'; return total_bytes; // always 0 } @@ -467,13 +480,37 @@ PgSQL_Logger::PgSQL_Logger() { #endif events.logfile=NULL; events.log_file_id=0; + events.current_log_size=0; events.max_log_file_size=100*1024*1024; audit.logfile=NULL; audit.log_file_id=0; + audit.current_log_size=0; audit.max_log_file_size=100*1024*1024; }; PgSQL_Logger::~PgSQL_Logger() { + // Flush all per-thread buffers before destroying the logger + { + std::lock_guard lock(log_thread_contexts_lock); + for (const auto& kv : log_thread_contexts) { + LogBufferThreadContext* log_ctx = kv.second.get(); + if (!log_ctx->events.empty()) { + flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr + ); + } + if (!log_ctx->audit.empty()) { + flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + nullptr + ); + } + } + } + if (events.datadir) { free(events.datadir); } @@ -508,6 +545,21 @@ void PgSQL_Logger::flush_log() { wrunlock(); } +bool PgSQL_Logger::is_events_logfile_open() const { + return events.logfile_open.load(); +} + +void PgSQL_Logger::set_events_logfile_open(bool open) { + events.logfile_open.store(open); +} + +bool PgSQL_Logger::is_audit_logfile_open() const { + return audit.logfile_open.load(); +} + +void PgSQL_Logger::set_audit_logfile_open(bool open) { + audit.logfile_open.store(open); +} void PgSQL_Logger::events_close_log_unlocked() { if (events.logfile) { @@ -558,12 +610,16 @@ void PgSQL_Logger::events_open_log_unlocked() { events.logfile->exceptions ( std::ofstream::failbit | std::ofstream::badbit ); try { events.logfile->open(filen , std::ios::out | std::ios::binary); + events.current_log_size = 0; + set_events_logfile_open(true); proxy_info("Starting new pgsql event log file %s\n", filen); } catch (const std::ofstream::failure&) { proxy_error("Error creating new pgsql event log file %s\n", filen); delete events.logfile; events.logfile=NULL; + events.current_log_size = 0; + set_events_logfile_open(false); } free(filen); }; @@ -587,12 +643,16 @@ void PgSQL_Logger::audit_open_log_unlocked() { audit.logfile->exceptions ( std::ofstream::failbit | std::ofstream::badbit ); try { audit.logfile->open(filen , std::ios::out | std::ios::binary); + audit.current_log_size = 0; + set_audit_logfile_open(true); proxy_info("Starting new pgsql audit log file %s\n", filen); } catch (const std::ofstream::failure&) { proxy_error("Error creating new pgsql audit log file %s\n", filen); delete audit.logfile; audit.logfile=NULL; + audit.current_log_size = 0; + set_audit_logfile_open(false); } free(filen); }; @@ -659,11 +719,19 @@ void PgSQL_Logger::audit_set_datadir(char *s) { void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { if (events.enabled==false) return; - if (events.logfile==NULL) return; + if (!is_events_logfile_open()) return; // 'PgSQL_Session::client_myds' could be NULL in case of 'RequestEnd' being called over a freshly created session // due to a failed 'CONNECTION_RESET'. Because this scenario isn't a client request, we just return. if (sess->client_myds==NULL || sess->client_myds->myconn== NULL) return; + // Obtain current thread's log ctx + LogBufferThreadContext* log_ctx = get_log_thread_context(); + + // Sample event logs. Set pgsql-eventslog_rate_limit=1 (default) to log all events + if (pgsql_thread___eventslog_rate_limit > 1) + if (!log_ctx->should_log(pgsql_thread___eventslog_rate_limit)) + return; + PgSQL_Connection_userinfo *ui=sess->client_myds->myconn->userinfo; uint64_t curtime_real=realtime_time(); @@ -782,17 +850,18 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { // right before the write to disk //wrlock(); - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - GloPgSQL_Logger->wrlock(); - - me.write(events.logfile, sess); - - - unsigned long curpos=events.logfile->tellp(); - if (curpos > events.max_log_file_size) { - events_flush_log_unlocked(); - } - wrunlock(); + if (is_events_logfile_open()) { + me.write(&log_ctx->events, sess); + if (log_ctx->events.size() > static_cast(pgsql_thread___eventslog_flush_size)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump + flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { events_flush_log_unlocked(); }, + monotonic_time() + ); + } + } if (cl && sess->client_myds->addr.port) { free(ca); @@ -804,11 +873,14 @@ void PgSQL_Logger::log_request(PgSQL_Session *sess, PgSQL_Data_Stream *myds) { void PgSQL_Logger::log_audit_entry(PGSQL_LOG_EVENT_TYPE _et, PgSQL_Session *sess, PgSQL_Data_Stream *myds, char *xi) { if (audit.enabled==false) return; - if (audit.logfile==NULL) return; + if (!is_audit_logfile_open()) return; if (sess == NULL) return; if (sess->client_myds == NULL) return; + // Obtain current thread's log ctx + LogBufferThreadContext* log_ctx = get_log_thread_context(); + PgSQL_Connection_userinfo *ui= NULL; if (sess) { if (sess->client_myds) { @@ -931,16 +1003,17 @@ void PgSQL_Logger::log_audit_entry(PGSQL_LOG_EVENT_TYPE _et, PgSQL_Session *sess // right before the write to disk //wrlock(); - //add a mutex lock in a multithreaded environment, avoid to get a null pointer of events.logfile that leads to the program coredump - GloPgSQL_Logger->wrlock(); - me.write(audit.logfile, sess); - - - unsigned long curpos=audit.logfile->tellp(); - if (curpos > audit.max_log_file_size) { - audit_flush_log_unlocked(); - } - wrunlock(); + if (is_audit_logfile_open()) { + me.write(&log_ctx->audit, sess); + if (log_ctx->audit.size() > static_cast(pgsql_thread___auditlog_flush_size)) { + //add a mutex lock in a multithreaded environment, avoid to get a null pointer of audit.logfile that leads to the program coredump + flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { audit_flush_log_unlocked(); }, + monotonic_time()); + } + } if (cl && sess->client_myds->addr.port) { free(ca); @@ -951,14 +1024,44 @@ void PgSQL_Logger::log_audit_entry(PGSQL_LOG_EVENT_TYPE _et, PgSQL_Session *sess } void PgSQL_Logger::flush() { - wrlock(); - if (events.logfile) { - events.logfile->flush(); - } - if (audit.logfile) { - audit.logfile->flush(); - } - wrunlock(); + LogBufferThreadContext* log_ctx = get_log_thread_context(); + const uint64_t current_time = monotonic_time(); + + // eventslog + bool flush_eventslog = false; + if (is_events_logfile_open()) { + if (log_ctx->events.size() > 0 && + (current_time - log_ctx->events.get_last_flush_time()) > static_cast(pgsql_thread___eventslog_flush_timeout) * 1000) { + flush_and_rotate( + log_ctx->events, + events.logfile, + events.current_log_size, + events.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { events_flush_log_unlocked(); }, + current_time + ); + } + } + + // auditlogs + bool flush_auditlog = false; + if (is_audit_logfile_open()) { + if (log_ctx->audit.size() > 0 && + (current_time - log_ctx->audit.get_last_flush_time()) > static_cast(pgsql_thread___auditlog_flush_timeout) * 1000) { + flush_and_rotate( + log_ctx->audit, + audit.logfile, + audit.current_log_size, + audit.max_log_file_size, + [this]() { wrlock(); }, + [this]() { wrunlock(); }, + [this]() { audit_flush_log_unlocked(); }, + current_time + ); + } + } } unsigned int PgSQL_Logger::events_find_next_id() { @@ -1054,6 +1157,9 @@ unsigned int PgSQL_Logger::audit_find_next_id() { } void PgSQL_Logger::print_version() { - fprintf(stderr,"Standard ProxySQL PgSQL Logger rev. %s -- %s -- %s\n", PROXYSQL_PGSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); + fprintf(stderr,"Standard ProxySQL PgSQL Logger rev. %s -- %s -- %s\n", PROXYSQL_PGSQL_LOGGER_VERSION, __FILE__, __TIMESTAMP__); } +LogBufferThreadContext* PgSQL_Logger::get_log_thread_context() { + return GetLogBufferThreadContext(log_thread_contexts, log_thread_contexts_lock, monotonic_time()); +} diff --git a/lib/PgSQL_Thread.cpp b/lib/PgSQL_Thread.cpp index 001b63d844..318bf7cbc1 100644 --- a/lib/PgSQL_Thread.cpp +++ b/lib/PgSQL_Thread.cpp @@ -293,8 +293,13 @@ static char* pgsql_thread_variables_names[] = { (char*)"eventslog_filesize", (char*)"eventslog_default_log", (char*)"eventslog_format", + (char*)"eventslog_flush_timeout", + (char*)"eventslog_flush_size", + (char*)"eventslog_rate_limit", (char*)"auditlog_filename", (char*)"auditlog_filesize", + (char*)"auditlog_flush_timeout", + (char*)"auditlog_flush_size", //(char *)"default_charset", // removed in 2.0.13 . Obsoleted previously using MySQL_Variables instead (char*)"handle_unknown_charset", (char*)"free_connections_pct", @@ -1025,8 +1030,13 @@ PgSQL_Threads_Handler::PgSQL_Threads_Handler() { variables.eventslog_filesize = 100 * 1024 * 1024; variables.eventslog_default_log = 0; variables.eventslog_format = 1; + variables.eventslog_flush_timeout = 1000; + variables.eventslog_flush_size = 4096; + variables.eventslog_rate_limit = 1; variables.auditlog_filename = strdup((char*)""); variables.auditlog_filesize = 100 * 1024 * 1024; + variables.auditlog_flush_timeout = 1000; + variables.auditlog_flush_size = 4096; variables.poll_timeout = 2000; variables.poll_timeout_on_failure = 100; variables.have_compress = true; @@ -1648,6 +1658,8 @@ bool PgSQL_Threads_Handler::set_variable(char* name, const char* value) { // thi // if we are switching format, we need to switch file too if (GloPgSQL_Logger) { proxy_info("Switching query logging format from %d to %d\n", variables.eventslog_format, intv); + // write existing logs (if any) to file before switching the file + GloPgSQL_Logger->flush(); GloPgSQL_Logger->flush_log(); } variables.eventslog_format = intv; @@ -1658,6 +1670,63 @@ bool PgSQL_Threads_Handler::set_variable(char* name, const char* value) { // thi return false; } } + if (!strcasecmp(name,"eventslog_flush_timeout")) { + int intv=atoi(value); + if (intv >= 0) { + variables.eventslog_flush_timeout=intv; + if (intv > 5 * 60 * 1000) { + proxy_warning("pgsql-eventslog_flush_timeout is set to a high value: %dms\n", intv); + } + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"eventslog_flush_size")) { + int intv=atoi(value); + if (intv >= 0) { + variables.eventslog_flush_size=intv; + if (intv > 10 * 1024 * 1024) { + proxy_warning("pgsql-eventslog_flush_size is set to a high value: %d\n", intv); + } + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"eventslog_rate_limit")) { + int intv=atoi(value); + if (intv >= 1) { + variables.eventslog_rate_limit=intv; + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"auditlog_flush_timeout")) { + int intv=atoi(value); + if (intv >= 0) { + variables.auditlog_flush_timeout=intv; + if (intv > 5 * 60 * 1000) { + proxy_warning("pgsql-auditlog_flush_timeout is set to a high value: %dms\n", intv); + } + return true; + } else { + return false; + } + } + if (!strcasecmp(name,"auditlog_flush_size")) { + int intv=atoi(value); + if (intv >= 0) { + variables.auditlog_flush_size=intv; + if (intv > 10 * 1024 * 1024) { + proxy_warning("pgsql-auditlog_flush_size is set to a high value: %d\n", intv); + } + return true; + } else { + return false; + } + } if (!strcasecmp(name, "default_schema")) { if (vallen) { free(variables.default_schema); @@ -2166,6 +2235,11 @@ char** PgSQL_Threads_Handler::get_variables_list() { // the input validation for these variables MUST be EXPLICIT VariablesPointers_int["binlog_reader_connect_retry_msec"] = make_tuple(&variables.binlog_reader_connect_retry_msec, 0, 0, true); VariablesPointers_int["eventslog_format"] = make_tuple(&variables.eventslog_format, 0, 0, true); + VariablesPointers_int["eventslog_flush_timeout"] = make_tuple(&variables.eventslog_flush_timeout, 0, 0, true); + VariablesPointers_int["eventslog_flush_size"] = make_tuple(&variables.eventslog_flush_size, 0, 0, true); + VariablesPointers_int["eventslog_rate_limit"] = make_tuple(&variables.eventslog_rate_limit, 0, 0, true); + VariablesPointers_int["auditlog_flush_timeout"] = make_tuple(&variables.auditlog_flush_timeout, 0, 0, true); + VariablesPointers_int["auditlog_flush_size"] = make_tuple(&variables.auditlog_flush_size, 0, 0, true); VariablesPointers_int["wait_timeout"] = make_tuple(&variables.wait_timeout, 0, 0, true); VariablesPointers_int["data_packets_history_size"] = make_tuple(&variables.data_packets_history_size, 0, 0, true); @@ -3911,10 +3985,15 @@ void PgSQL_Thread::refresh_variables() { pgsql_thread___eventslog_filesize = GloPTH->get_variable_int((char*)"eventslog_filesize"); pgsql_thread___eventslog_default_log = GloPTH->get_variable_int((char*)"eventslog_default_log"); pgsql_thread___eventslog_format = GloPTH->get_variable_int((char*)"eventslog_format"); + pgsql_thread___eventslog_flush_timeout = GloPTH->get_variable_int((char*)"eventslog_flush_timeout"); + pgsql_thread___eventslog_flush_size = GloPTH->get_variable_int((char*)"eventslog_flush_size"); + pgsql_thread___eventslog_rate_limit = GloPTH->get_variable_int((char*)"eventslog_rate_limit"); pgsql_thread___eventslog_filename = GloPTH->get_variable_string((char*)"eventslog_filename"); if (pgsql_thread___auditlog_filename) free(pgsql_thread___auditlog_filename); pgsql_thread___auditlog_filesize = GloPTH->get_variable_int((char*)"auditlog_filesize"); pgsql_thread___auditlog_filename = GloPTH->get_variable_string((char*)"auditlog_filename"); + pgsql_thread___auditlog_flush_timeout = GloPTH->get_variable_int((char*)"auditlog_flush_timeout"); + pgsql_thread___auditlog_flush_size = GloPTH->get_variable_int((char*)"auditlog_flush_size"); GloPgSQL_Logger->events_set_base_filename(); // both filename and filesize are set here GloPgSQL_Logger->audit_set_base_filename(); // both filename and filesize are set here diff --git a/lib/ProxySQL_Admin.cpp b/lib/ProxySQL_Admin.cpp index 151c3d0776..ccf825b741 100644 --- a/lib/ProxySQL_Admin.cpp +++ b/lib/ProxySQL_Admin.cpp @@ -1095,9 +1095,13 @@ void flush_logs_handler() { void ProxySQL_Admin::flush_logs() { if (GloMyLogger) { + // flush any buffered logs before flushing log file + GloMyLogger->flush(); GloMyLogger->flush_log(); } if (GloPgSQL_Logger) { + // flush any buffered logs before flushing log file + GloPgSQL_Logger->flush(); GloPgSQL_Logger->flush_log(); } this->flush_error_log(); diff --git a/lib/log_utils.cpp b/lib/log_utils.cpp new file mode 100644 index 0000000000..0550fb0be6 --- /dev/null +++ b/lib/log_utils.cpp @@ -0,0 +1,147 @@ +#include +#include +#include +#include "log_utils.h" + + +LogBuffer::LogBuffer() : last_flush_time(0) {} + +LogBuffer& LogBuffer::operator<<(const std::string& value) { + buffer.append(value); + return *this; +} + +LogBuffer& LogBuffer::operator<<(const char* value) { + buffer.append(value); + return *this; +} + +LogBuffer& LogBuffer::operator<<(char value) { + buffer.push_back(value); + return *this; +} + +LogBuffer& LogBuffer::append(const std::string& str) { + buffer.append(str); + return *this; +} + +LogBuffer& LogBuffer::append(const char* str) { + buffer.append(str); + return *this; +} + +LogBuffer& LogBuffer::append(const char* str, size_t len) { + buffer.append(str, len); + return *this; +} + +LogBuffer& LogBuffer::write(const std::string& str) { + buffer.append(str); + return *this; +} + +LogBuffer& LogBuffer::write(const char* str, size_t len) { + buffer.append(str, len); + return *this; +} + +void LogBuffer::reset(uint64_t flush_time) { + buffer.clear(); + last_flush_time = flush_time; +} + +void LogBuffer::set_last_flush_time(uint64_t flush_time) { + last_flush_time = flush_time; +} + +uint64_t LogBuffer::get_last_flush_time() const { + return last_flush_time; +} + +bool LogBuffer::empty() const { + return buffer.empty(); +} + +size_t LogBuffer::size() const { + return buffer.size(); +} + +const char* LogBuffer::data() const { + return buffer.data(); +} + +void LogBuffer::flush_to_file(std::fstream* logfile) { + if (!logfile || buffer.empty()) { + return; + } + logfile->write(buffer.data(), buffer.size()); +} + +bool flush_and_rotate( + LogBuffer& buffer, + std::fstream* logfile, + unsigned int& current_log_size, + unsigned int max_log_file_size, + std::function lock_fn, + std::function unlock_fn, + std::function rotate_fn, + uint64_t reset_time) +{ + bool flushed = false; + lock_fn(); + if (logfile) { + buffer.flush_to_file(logfile); + current_log_size += buffer.size(); + flushed = true; + if (current_log_size > max_log_file_size && rotate_fn) { + rotate_fn(); + current_log_size = 0; + } + logfile->flush(); + } + unlock_fn(); + if (flushed) { + buffer.reset(reset_time); + } + return flushed; +} + +LogBufferThreadContext::LogBufferThreadContext() : dist(0.0, 1.0) { + // Seed the Mersenne Twister with multiple entropy sources + std::random_device rd; + std::seed_seq seed{ + rd(), + static_cast(std::chrono::high_resolution_clock::now().time_since_epoch().count()), + static_cast(pthread_self()) + }; + rng.seed(seed); +} + +bool LogBufferThreadContext::should_log(int rate_limit) { + return dist(rng) * static_cast(rate_limit) <= 1.0; +} + +LogBufferThreadContext* GetLogBufferThreadContext(std::unordered_map>& log_thread_contexts, std::mutex& log_thread_contexts_lock, uint64_t current_time) { + pthread_t tid = pthread_self(); + { + std::lock_guard lock(log_thread_contexts_lock); + auto it = log_thread_contexts.find(tid); + if (it != log_thread_contexts.end()) { + return it->second.get(); + } + } + + // Context doesn't exist for this thread, create it with proper initialization + auto new_context = std::make_unique(); + LogBufferThreadContext* ptr = new_context.get(); + // init() is already called in the constructor, which initializes both events and audit buffers + ptr->events.set_last_flush_time(current_time); + ptr->audit.set_last_flush_time(current_time); + + { + std::lock_guard lock(log_thread_contexts_lock); + log_thread_contexts[tid] = std::move(new_context); + } + return ptr; +} From 091942e9d6d6c817fde64635d65febb4ebb29baf Mon Sep 17 00:00:00 2001 From: vramesha Date: Wed, 11 Feb 2026 14:37:47 +0530 Subject: [PATCH 2/3] Address AI review suggestions --- include/log_utils.h | 3 ++- lib/MySQL_Logger.cpp | 1 + lib/PgSQL_Logger.cpp | 2 -- lib/log_utils.cpp | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/include/log_utils.h b/include/log_utils.h index d06000395b..5d1932f852 100644 --- a/include/log_utils.h +++ b/include/log_utils.h @@ -121,7 +121,8 @@ class LogBuffer { /** * @brief Flushes the buffer to an output file stream. * - * Writes the entire contents of the buffer to the provided file stream and resets the buffer. + * Writes the entire contents of the buffer to the provided file stream. + * The caller is responsible for resetting the buffer after flushing. * * @param logfile The output file stream to write to. */ diff --git a/lib/MySQL_Logger.cpp b/lib/MySQL_Logger.cpp index 0e96150c45..88f01c012b 100644 --- a/lib/MySQL_Logger.cpp +++ b/lib/MySQL_Logger.cpp @@ -1355,6 +1355,7 @@ void MySQL_Logger::events_open_log_unlocked() { metaEvent.write(&log_ctx->events, nullptr); log_ctx->events.flush_to_file(events.logfile); events.current_log_size += log_ctx->events.size(); + log_ctx->events.reset(monotonic_time()); } } catch (const std::ofstream::failure&) { diff --git a/lib/PgSQL_Logger.cpp b/lib/PgSQL_Logger.cpp index 426bc659da..5e6f930540 100644 --- a/lib/PgSQL_Logger.cpp +++ b/lib/PgSQL_Logger.cpp @@ -1028,7 +1028,6 @@ void PgSQL_Logger::flush() { const uint64_t current_time = monotonic_time(); // eventslog - bool flush_eventslog = false; if (is_events_logfile_open()) { if (log_ctx->events.size() > 0 && (current_time - log_ctx->events.get_last_flush_time()) > static_cast(pgsql_thread___eventslog_flush_timeout) * 1000) { @@ -1046,7 +1045,6 @@ void PgSQL_Logger::flush() { } // auditlogs - bool flush_auditlog = false; if (is_audit_logfile_open()) { if (log_ctx->audit.size() > 0 && (current_time - log_ctx->audit.get_last_flush_time()) > static_cast(pgsql_thread___auditlog_flush_timeout) * 1000) { diff --git a/lib/log_utils.cpp b/lib/log_utils.cpp index 0550fb0be6..d78ac157f6 100644 --- a/lib/log_utils.cpp +++ b/lib/log_utils.cpp @@ -94,11 +94,11 @@ bool flush_and_rotate( buffer.flush_to_file(logfile); current_log_size += buffer.size(); flushed = true; + logfile->flush(); if (current_log_size > max_log_file_size && rotate_fn) { rotate_fn(); current_log_size = 0; } - logfile->flush(); } unlock_fn(); if (flushed) { From 71143c2e8457ec44d544f27b3a75467f7dad6991 Mon Sep 17 00:00:00 2001 From: vramesha Date: Fri, 13 Feb 2026 11:33:31 +0530 Subject: [PATCH 3/3] Address AI review comments - Added to log_utils.h and removed the header from log_utils.cpp - Pass 0 to reset_time argument in flush_and_rotate inside logger's destructor --- include/log_utils.h | 2 +- lib/MySQL_Logger.cpp | 6 ++++-- lib/PgSQL_Logger.cpp | 6 ++++-- lib/log_utils.cpp | 1 - 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/include/log_utils.h b/include/log_utils.h index 5d1932f852..29475deb60 100644 --- a/include/log_utils.h +++ b/include/log_utils.h @@ -9,7 +9,7 @@ #include #include #include - +#include /** * @brief Manages a string buffer and a flush timestamp for logging. diff --git a/lib/MySQL_Logger.cpp b/lib/MySQL_Logger.cpp index 88f01c012b..58087fce30 100644 --- a/lib/MySQL_Logger.cpp +++ b/lib/MySQL_Logger.cpp @@ -1218,14 +1218,16 @@ MySQL_Logger::~MySQL_Logger() { flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, [this]() { wrlock(); }, [this]() { wrunlock(); }, - nullptr + nullptr, + 0 ); } if (!log_ctx->audit.empty()) { flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, [this]() { wrlock(); }, [this]() { wrunlock(); }, - nullptr + nullptr, + 0 ); } } diff --git a/lib/PgSQL_Logger.cpp b/lib/PgSQL_Logger.cpp index 5e6f930540..3668d74558 100644 --- a/lib/PgSQL_Logger.cpp +++ b/lib/PgSQL_Logger.cpp @@ -498,14 +498,16 @@ PgSQL_Logger::~PgSQL_Logger() { flush_and_rotate(log_ctx->events, events.logfile, events.current_log_size, events.max_log_file_size, [this]() { wrlock(); }, [this]() { wrunlock(); }, - nullptr + nullptr, + 0 ); } if (!log_ctx->audit.empty()) { flush_and_rotate(log_ctx->audit, audit.logfile, audit.current_log_size, audit.max_log_file_size, [this]() { wrlock(); }, [this]() { wrunlock(); }, - nullptr + nullptr, + 0 ); } } diff --git a/lib/log_utils.cpp b/lib/log_utils.cpp index d78ac157f6..319fd84b05 100644 --- a/lib/log_utils.cpp +++ b/lib/log_utils.cpp @@ -1,6 +1,5 @@ #include #include -#include #include "log_utils.h"