Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 29 additions & 3 deletions src/drivers/pgsql/drv_pgsql.c
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#endif

#include <libpq-fe.h>
#include <stdatomic.h>

#include "sb_options.h"
#include "db_driver.h"
Expand All @@ -47,6 +48,7 @@ static sb_arg_t pgsql_drv_args[] =
SB_OPT("pgsql-user", "PostgreSQL user", "sbtest", STRING),
SB_OPT("pgsql-password", "PostgreSQL password", "", STRING),
SB_OPT("pgsql-db", "PostgreSQL database name", "sbtest", STRING),
SB_OPT("conn-create-count", "Number of connections to be created in one go. To reduce the overhead on YB master", "10", INT),

SB_OPT_END
};
Expand Down Expand Up @@ -111,8 +113,16 @@ static pgsql_drv_args_t args; /* driver args */

static char use_ps; /* whether server-side prepared statemens should be used */

/* PgSQL driver operations */
typedef volatile struct {
volatile atomic_int conn_count;
volatile atomic_flag lock;
} conn_sem_t;
conn_sem_t conn_sem;

#define acquire_lock(l) while (atomic_flag_test_and_set(l))
#define release_lock(l) atomic_flag_clear(l)

/* PgSQL driver operations */

static char** str_split(char *src, char sep, int *numparts);
static int pgsql_drv_init(void);
Expand All @@ -131,6 +141,8 @@ static db_error_t pgsql_drv_query(db_conn_t *, const char *, size_t,
static int pgsql_drv_free_results(db_result_t *);
static int pgsql_drv_close(db_stmt_t *);
static int pgsql_drv_done(void);
static void conn_create_wait(conn_sem_t * s);
static int conn_create_signal(conn_sem_t * s);

/* PgSQL driver definition */

Expand Down Expand Up @@ -188,12 +200,14 @@ int pgsql_drv_init(void)
args.user = sb_get_value_string("pgsql-user");
args.password = sb_get_value_string("pgsql-password");
args.db = sb_get_value_string("pgsql-db");
int conn_count = sb_get_value_int("conn-create-count");

use_ps = 0;
pgsql_drv_caps.prepared_statements = 1;
if (db_globals.ps_mode != DB_PS_MODE_DISABLE)
use_ps = 1;

atomic_init(&conn_sem.conn_count, conn_count);

return 0;
}

Expand Down Expand Up @@ -241,6 +255,7 @@ static void empty_notice_processor(void *arg, const char *msg)
int pgsql_drv_connect(db_conn_t *sb_conn)
{
PGconn *con;
conn_create_wait(&conn_sem);
int hostindex = sb_conn->thread_id % args.numhosts;

con = PQsetdbLogin(args.hosts[hostindex],
Expand All @@ -262,7 +277,7 @@ int pgsql_drv_connect(db_conn_t *sb_conn)
/* Silence the default notice receiver spitting NOTICE message to stderr */
PQsetNoticeProcessor(con, empty_notice_processor, NULL);
sb_conn->ptr = con;

conn_create_signal(&conn_sem);
return 0;
}

Expand Down Expand Up @@ -874,3 +889,14 @@ int get_unique_stmt_name(char *name, int len)
(int) sb_rand_uniform_uint64(),
(int) sb_rand_uniform_uint64());
}

void conn_create_wait(conn_sem_t * sem) {
acquire_lock(&sem->lock);
while (atomic_load(&sem->conn_count) <= 0);
atomic_fetch_sub(&sem->conn_count, 1);
release_lock(&sem->lock);
}

int conn_create_signal(conn_sem_t * sem) {
return atomic_fetch_add(&sem->conn_count, 1);
}