diff --git a/src/drivers/pgsql/drv_pgsql.c b/src/drivers/pgsql/drv_pgsql.c index 437f1b5c5..90e4ccb7f 100644 --- a/src/drivers/pgsql/drv_pgsql.c +++ b/src/drivers/pgsql/drv_pgsql.c @@ -28,6 +28,7 @@ #endif #include +#include #include "sb_options.h" #include "db_driver.h" @@ -47,6 +48,7 @@ static sb_arg_t pgsql_drv_args[] = SB_OPT("pgsql-user", "PostgreSQL user", "sbtest", STRING), SB_OPT("pgsql-password", "PostgreSQL password", "", STRING), SB_OPT("pgsql-db", "PostgreSQL database name", "sbtest", STRING), + SB_OPT("conn-create-count", "Number of connections to be created in one go. To reduce the overhead on YB master", "10", INT), SB_OPT_END }; @@ -111,8 +113,16 @@ static pgsql_drv_args_t args; /* driver args */ static char use_ps; /* whether server-side prepared statemens should be used */ -/* PgSQL driver operations */ +typedef volatile struct { + volatile atomic_int conn_count; + volatile atomic_flag lock; +} conn_sem_t; +conn_sem_t conn_sem; + +#define acquire_lock(l) while (atomic_flag_test_and_set(l)) +#define release_lock(l) atomic_flag_clear(l) +/* PgSQL driver operations */ static char** str_split(char *src, char sep, int *numparts); static int pgsql_drv_init(void); @@ -131,6 +141,8 @@ static db_error_t pgsql_drv_query(db_conn_t *, const char *, size_t, static int pgsql_drv_free_results(db_result_t *); static int pgsql_drv_close(db_stmt_t *); static int pgsql_drv_done(void); +static void conn_create_wait(conn_sem_t * s); +static int conn_create_signal(conn_sem_t * s); /* PgSQL driver definition */ @@ -188,12 +200,14 @@ int pgsql_drv_init(void) args.user = sb_get_value_string("pgsql-user"); args.password = sb_get_value_string("pgsql-password"); args.db = sb_get_value_string("pgsql-db"); + int conn_count = sb_get_value_int("conn-create-count"); use_ps = 0; pgsql_drv_caps.prepared_statements = 1; if (db_globals.ps_mode != DB_PS_MODE_DISABLE) use_ps = 1; - + atomic_init(&conn_sem.conn_count, conn_count); + return 0; } @@ -241,6 +255,7 @@ static void empty_notice_processor(void *arg, const char *msg) int pgsql_drv_connect(db_conn_t *sb_conn) { PGconn *con; + conn_create_wait(&conn_sem); int hostindex = sb_conn->thread_id % args.numhosts; con = PQsetdbLogin(args.hosts[hostindex], @@ -262,7 +277,7 @@ int pgsql_drv_connect(db_conn_t *sb_conn) /* Silence the default notice receiver spitting NOTICE message to stderr */ PQsetNoticeProcessor(con, empty_notice_processor, NULL); sb_conn->ptr = con; - + conn_create_signal(&conn_sem); return 0; } @@ -874,3 +889,14 @@ int get_unique_stmt_name(char *name, int len) (int) sb_rand_uniform_uint64(), (int) sb_rand_uniform_uint64()); } + +void conn_create_wait(conn_sem_t * sem) { + acquire_lock(&sem->lock); + while (atomic_load(&sem->conn_count) <= 0); + atomic_fetch_sub(&sem->conn_count, 1); + release_lock(&sem->lock); +} + +int conn_create_signal(conn_sem_t * sem) { + return atomic_fetch_add(&sem->conn_count, 1); +} \ No newline at end of file